{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "22/01/17 00:54:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.\n",
      "22/01/17 00:54:49 ERROR SparkContext: Error initializing SparkContext.\n",
      "java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.\n",
      "\tat sun.nio.ch.Net.bind0(Native Method)\n",
      "\tat sun.nio.ch.Net.bind(Net.java:433)\n",
      "\tat sun.nio.ch.Net.bind(Net.java:425)\n",
      "\tat sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)\n",
      "\tat io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)\n",
      "\tat io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:562)\n",
      "\tat io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)\n",
      "\tat io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)\n",
      "\tat io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)\n",
      "\tat io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)\n",
      "\tat io.netty.channel.AbstractChannel.bind(AbstractChannel.java:260)\n",
      "\tat io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)\n",
      "\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n",
      "\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)\n",
      "\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)\n",
      "\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)\n",
      "\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n",
      "\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n",
      "\tat java.lang.Thread.run(Thread.java:748)\n"
     ]
    },
    {
     "ename": "Py4JJavaError",
     "evalue": "An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.\n: java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.\n\tat sun.nio.ch.Net.bind0(Native Method)\n\tat sun.nio.ch.Net.bind(Net.java:433)\n\tat sun.nio.ch.Net.bind(Net.java:425)\n\tat sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)\n\tat io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)\n\tat io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:562)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)\n\tat io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)\n\tat io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)\n\tat io.netty.channel.AbstractChannel.bind(AbstractChannel.java:260)\n\tat io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.lang.Thread.run(Thread.java:748)\n",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mPy4JJavaError\u001b[0m                             Traceback (most recent call last)",
      "\u001b[0;32m/var/folders/24/8j02b1nd69g0r8n15sc8n38w0000gn/T/ipykernel_73166/2411192990.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      7\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      8\u001b[0m \u001b[0;31m# 使用pyspark实现wordcount\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 9\u001b[0;31m \u001b[0msc\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mSparkContext\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"local\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"word_count\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     10\u001b[0m \u001b[0mtxt\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtextFile\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"dataset/txt.txt\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     11\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/Library/Python/3.7/lib/python/site-packages/pyspark/context.py\u001b[0m in \u001b[0;36m__init__\u001b[0;34m(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)\u001b[0m\n\u001b[1;32m    145\u001b[0m         \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    146\u001b[0m             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,\n\u001b[0;32m--> 147\u001b[0;31m                           conf, jsc, profiler_cls)\n\u001b[0m\u001b[1;32m    148\u001b[0m         \u001b[0;32mexcept\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    149\u001b[0m             \u001b[0;31m# If an error occurs, clean up in order to allow future SparkContext creation:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/Library/Python/3.7/lib/python/site-packages/pyspark/context.py\u001b[0m in \u001b[0;36m_do_init\u001b[0;34m(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)\u001b[0m\n\u001b[1;32m    207\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    208\u001b[0m         \u001b[0;31m# Create the Java SparkContext through Py4J\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 209\u001b[0;31m         \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jsc\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mjsc\u001b[0m \u001b[0;32mor\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_initialize_context\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_conf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jconf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    210\u001b[0m         \u001b[0;31m# Reset the SparkConf to the one actually used by the SparkContext in JVM.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    211\u001b[0m         \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_conf\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mSparkConf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_jconf\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jsc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mconf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/Library/Python/3.7/lib/python/site-packages/pyspark/context.py\u001b[0m in \u001b[0;36m_initialize_context\u001b[0;34m(self, jconf)\u001b[0m\n\u001b[1;32m    327\u001b[0m         \u001b[0mInitialize\u001b[0m \u001b[0mSparkContext\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mfunction\u001b[0m \u001b[0mto\u001b[0m \u001b[0mallow\u001b[0m \u001b[0msubclass\u001b[0m \u001b[0mspecific\u001b[0m \u001b[0minitialization\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    328\u001b[0m         \"\"\"\n\u001b[0;32m--> 329\u001b[0;31m         \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mJavaSparkContext\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mjconf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    330\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    331\u001b[0m     \u001b[0;34m@\u001b[0m\u001b[0mclassmethod\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/Library/Python/3.7/lib/python/site-packages/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m   1572\u001b[0m         \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_gateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1573\u001b[0m         return_value = get_return_value(\n\u001b[0;32m-> 1574\u001b[0;31m             answer, self._gateway_client, None, self._fqn)\n\u001b[0m\u001b[1;32m   1575\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1576\u001b[0m         \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/Library/Python/3.7/lib/python/site-packages/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m    326\u001b[0m                 raise Py4JJavaError(\n\u001b[1;32m    327\u001b[0m                     \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m                     format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m    329\u001b[0m             \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    330\u001b[0m                 raise Py4JError(\n",
      "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.\n: java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.\n\tat sun.nio.ch.Net.bind0(Native Method)\n\tat sun.nio.ch.Net.bind(Net.java:433)\n\tat sun.nio.ch.Net.bind(Net.java:425)\n\tat sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)\n\tat io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)\n\tat io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:562)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)\n\tat io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)\n\tat io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)\n\tat io.netty.channel.AbstractChannel.bind(AbstractChannel.java:260)\n\tat io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.lang.Thread.run(Thread.java:748)\n"
     ]
    }
   ],
   "source": [
    "# JAVA_HOME\n",
    "# SPARK_HOME\n",
    "# HADOOP_HOME\n",
    "# PYSPARK_PYTHON\n",
    "\n",
    "from pyspark import SparkContext\n",
    "\n",
    "# 使用pyspark实现wordcount\n",
    "sc = SparkContext(\"local\", \"word_count\")\n",
    "txt = sc.textFile(\"dataset/txt.txt\")\n",
    "\n",
    "line = txt.flatMap(lambda x: x.split(\" \"))\n",
    "word = line.map(lambda y: (y, 1))\n",
    "\n",
    "rdd = word.reduceByKey(lambda x, y: x + y)\n",
    "rdd.foreach(print)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "interpreter": {
   "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
  },
  "kernelspec": {
   "display_name": "Python 3.7.3 64-bit",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
